In [1]:
#pip install dask
#pip install dask-ml
import plotly.io as pio
#pio.renderers.default='notebook'
In [2]:
import dask
import dask.dataframe as dd
from time import time
import plotly.express as px
import pandas as pd
import statsmodels.api as sm
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score, mean_squared_error
from time import time
import multiprocessing as mp
import dask.dataframe as dd
import statsmodels.api as sm
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score, mean_squared_error
from time import time
from dask_ml.linear_model import LinearRegression
from scipy.stats.mstats import winsorize
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')
C:\Users\K\AppData\Local\Temp\ipykernel_4376\1015654004.py:2: DeprecationWarning: The current Dask DataFrame implementation is deprecated. 
In a future release, Dask DataFrame will use a new implementation that
contains several improvements including a logical query planning.
The user-facing DataFrame API will remain unchanged.

The new implementation is already available and can be enabled by
installing the dask-expr library:

    $ pip install dask-expr

and turning the query planning option on:

    >>> import dask
    >>> dask.config.set({'dataframe.query-planning': True})
    >>> import dask.dataframe as dd

API documentation for the new implementation is available at
https://docs.dask.org/en/stable/dask-expr-api.html

Any feedback can be reported on the Dask issue tracker
https://github.com/dask/dask/issues 

To disable this warning in the future, set dask config:

    # via Python
    >>> dask.config.set({'dataframe.query-planning-warning': False})

    # via CLI
    dask config set dataframe.query-planning-warning False


  import dask.dataframe as dd

Data Processing¶

In [3]:
%%time
## Load full trip data
data_full = dd.read_csv("data/Trips_Full Data.csv", assume_missing=True)
#load trip data by distance
data_dist = dd.read_csv("data/Trips_by_Distance.csv", assume_missing=True, dtype={'County Name': 'object', 'State Postal Code': 'object'})
CPU times: total: 78.1 ms
Wall time: 141 ms
In [4]:
# Check for missing values
missing_full = data_full.isnull().sum().compute()
missing_dist = data_dist.isnull().sum().compute()
In [5]:
#number missing values per variable
print("Missing values in full trip data:")
print(missing_full)

print("\nMissing values in trip data by distance:")
print(missing_dist)
Missing values in full trip data:
Month of Date                 0
Week of Date                  0
Year of Date                  0
Level                         0
Date                          0
Week Ending Date              0
Trips <1 Mile                 0
People Not Staying at Home    0
Population Staying at Home    0
Trips                         0
Trips 1-25 Miles              0
Trips 1-3 Miles               0
Trips 10-25 Miles             0
Trips 100-250 Miles           0
Trips 100+ Miles              0
Trips 25-100 Miles            0
Trips 25-50 Miles             0
Trips 250-500 Miles           0
Trips 3-5 Miles               0
Trips 5-10 Miles              0
Trips 50-100 Miles            0
Trips 500+ Miles              0
dtype: int64

Missing values in trip data by distance:
Level                                 0
Date                                  0
State FIPS                          901
State Postal Code                   901
County FIPS                       46852
County Name                       46852
Population Staying at Home        12950
Population Not Staying at Home    12950
Number of Trips                   12950
Number of Trips <1                12950
Number of Trips 1-3               12950
Number of Trips 3-5               12950
Number of Trips 5-10              12950
Number of Trips 10-25             12950
Number of Trips 25-50             12950
Number of Trips 50-100            12950
Number of Trips 100-250           12950
Number of Trips 250-500           12950
Number of Trips >=500             12950
Row ID                                0
Week                                  0
Month                                 0
dtype: int64
In [6]:
%%time
# Drop columns with more than 50% missing entries
threshold = 0.5
columns_to_drop_full = missing_full[missing_full > threshold * len(data_full)].index
columns_to_drop_dist = missing_dist[missing_dist > threshold * len(data_dist)].index

data_full = data_full.drop(columns=columns_to_drop_full)
data_dist = data_dist.drop(columns=columns_to_drop_dist)
CPU times: total: 4.06 s
Wall time: 8.75 s
In [7]:
# Drop rows with missing data
data_full = data_full.dropna().reset_index()
data_dist = data_dist.dropna().reset_index()
In [8]:
%%time
# Display info after dropping columns and rows
print("\nInfo after dropping columns and rows:")
print("Full trip data:")
print(data_full.compute().info())

print("\nTrip data by distance:")
print(data_dist.compute().info())
Info after dropping columns and rows:
Full trip data:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 7 entries, 0 to 6
Data columns (total 23 columns):
 #   Column                      Non-Null Count  Dtype  
---  ------                      --------------  -----  
 0   index                       7 non-null      int64  
 1   Month of Date               7 non-null      string 
 2   Week of Date                7 non-null      string 
 3   Year of Date                7 non-null      float64
 4   Level                       7 non-null      string 
 5   Date                        7 non-null      string 
 6   Week Ending Date            7 non-null      string 
 7   Trips <1 Mile               7 non-null      float64
 8   People Not Staying at Home  7 non-null      float64
 9   Population Staying at Home  7 non-null      float64
 10  Trips                       7 non-null      float64
 11  Trips 1-25 Miles            7 non-null      float64
 12  Trips 1-3 Miles             7 non-null      float64
 13  Trips 10-25 Miles           7 non-null      float64
 14  Trips 100-250 Miles         7 non-null      float64
 15  Trips 100+ Miles            7 non-null      float64
 16  Trips 25-100 Miles          7 non-null      float64
 17  Trips 25-50 Miles           7 non-null      float64
 18  Trips 250-500 Miles         7 non-null      float64
 19  Trips 3-5 Miles             7 non-null      float64
 20  Trips 5-10 Miles            7 non-null      float64
 21  Trips 50-100 Miles          7 non-null      float64
 22  Trips 500+ Miles            7 non-null      float64
dtypes: float64(17), int64(1), string(5)
memory usage: 1.7 KB
None

Trip data by distance:
<class 'pandas.core.frame.DataFrame'>
Index: 988773 entries, 0 to 515015
Data columns (total 23 columns):
 #   Column                          Non-Null Count   Dtype  
---  ------                          --------------   -----  
 0   index                           988773 non-null  int64  
 1   Level                           988773 non-null  string 
 2   Date                            988773 non-null  string 
 3   State FIPS                      988773 non-null  float64
 4   State Postal Code               988773 non-null  string 
 5   County FIPS                     988773 non-null  float64
 6   County Name                     988773 non-null  string 
 7   Population Staying at Home      988773 non-null  float64
 8   Population Not Staying at Home  988773 non-null  float64
 9   Number of Trips                 988773 non-null  float64
 10  Number of Trips <1              988773 non-null  float64
 11  Number of Trips 1-3             988773 non-null  float64
 12  Number of Trips 3-5             988773 non-null  float64
 13  Number of Trips 5-10            988773 non-null  float64
 14  Number of Trips 10-25           988773 non-null  float64
 15  Number of Trips 25-50           988773 non-null  float64
 16  Number of Trips 50-100          988773 non-null  float64
 17  Number of Trips 100-250         988773 non-null  float64
 18  Number of Trips 250-500         988773 non-null  float64
 19  Number of Trips >=500           988773 non-null  float64
 20  Row ID                          988773 non-null  string 
 21  Week                            988773 non-null  float64
 22  Month                           988773 non-null  float64
dtypes: float64(17), int64(1), string(5)
memory usage: 226.7 MB
None
CPU times: total: 5.55 s
Wall time: 10 s
Check if there are any outliers¶
In [9]:
fig = px.box(data_dist.compute(), y='Population Staying at Home', title='Box Plot of Trips')

# Update layout for better visualization
fig.update_layout(yaxis_title='Trips', boxmode='group')
# display figure
fig.update_layout(height=600, width=1200)
# Display the figure
fig.show()

There are some outliers in the data. To handle outleirs, we will apply winsorization (https://www.geeksforgeeks.org/winsorization/)

1A - Data Aggregation (People staying at home)¶

In [10]:
def process_data_dask(file_path, num_workers):
    # Set the Dask scheduler and number of workers
    dask.config.set(scheduler='processes', num_workers=num_workers)

    # Read CSV file into a Dask DataFrame
    start = time()
    air = dd.read_csv(file_path, assume_missing=True, dtype={'County Name': 'object', 'State Postal Code': 'object'})
    #process the data
    # Drop columns with more than 50% missing entries
    missing = air.isnull().sum().compute()
    threshold = 0.5
    columns_to_drop = missing[missing > threshold * len(air)].index
    
    air = air.drop(columns=columns_to_drop)
    air = air.dropna().reset_index().compute()
    # Winsorize outliers for numeric columns
    numeric_columns = air.select_dtypes(include='number').columns
    #air[numeric_columns] = air[numeric_columns].map(winsorize, axis=0)

    # Drop rows with missing values after winsorization
    air = air.dropna().reset_index(drop=True)
    read_time = round(time() - start, 2)

    # Perform groupby and compute mean
    start = time()
    by_week = air.groupby('Week')['Population Staying at Home'].mean()
    result = round(by_week, 2)
    groupby_time = round(time() - start, 2)

    return result, read_time, groupby_time

def process_data_sequential(file_path):
    dask.config.set(scheduler='synchronous')
    # Read CSV file into a Dask DataFrame
    start = time()
    air = dd.read_csv(file_path, assume_missing=True, dtype={'County Name': 'object', 'State Postal Code': 'object'})
    #process the data
    # Drop columns with more than 50% missing entries
    missing = air.isnull().sum().compute()
    threshold = 0.5
    columns_to_drop = missing[missing > threshold * len(air)].index
    
    air = air.drop(columns=columns_to_drop)
    air = air.dropna().reset_index().compute()
    # Winsorize outliers for numeric columns
    numeric_columns = air.select_dtypes(include='number').columns
    #air[numeric_columns] = air[numeric_columns].map(winsorize, axis=0)

    # Drop rows with missing values after winsorization
    air = air.dropna().reset_index(drop=True)
    read_time = round(time() - start, 2)

    # Perform groupby and compute mean
    start = time()
    by_week = air.groupby('Week')['Population Staying at Home'].mean()
    result = round(by_week, 2)
    groupby_time = round(time() - start, 2)

    return result, read_time, groupby_time

def main_dask():
    file_path = "data/Trips_by_Distance.csv"
    # Process data sequentially using Dask
    result_seq, read_time_seq, groupby_time_seq = process_data_sequential(file_path)
    total_time_seq_1a = read_time_seq + groupby_time_seq
    print("Results with sequential processing:")
    display(result_seq)
    print("Total Time: {} seconds".format(total_time_seq_1a))
    print("Read Time: {} seconds".format(read_time_seq))
    print("Groupby Time: {} seconds".format(groupby_time_seq))
    print("Results with 10 processors:")
    # Process data using Dask with 10 processors
    result_10, read_time_10, groupby_time_10 = process_data_dask(file_path, num_workers=10)
    total_time_dask_10_1a = read_time_10 + groupby_time_10

    print("\nResults with 10 processors:")
    display(result_10)
    print("Total Time: {} seconds".format(total_time_dask_10_1a))
    print("Read Time: {} seconds".format(read_time_10))
    print("Groupby Time: {} seconds".format(groupby_time_10))

    # Process data using Dask with 20 processors
    result_20, read_time_20, groupby_time_20 = process_data_dask(file_path, num_workers=20)
    total_time_dask_20_1a = read_time_20 + groupby_time_20

    print("\nResults with 20 processors:")
    display(result_20)
    print("Total Time: {} seconds".format(total_time_dask_20_1a))
    print("Read Time: {} seconds".format(read_time_20))
    print("Groupby Time: {} seconds".format(groupby_time_20))
    #return one aggregation result for visualization
    return result_20, total_time_dask_10_1a, total_time_dask_20_1a, total_time_seq_1a

if __name__ == '__main__':
    results, total_time_dask_10_1a, total_time_dask_20_1a, total_time_seq_1a = main_dask()
Results with sequential processing:
Week
0.0     20976.79
1.0     20071.31
2.0     19897.34
3.0     19889.44
4.0     20096.31
5.0     19757.43
6.0     19599.07
7.0     20033.60
8.0     19741.67
9.0     20043.10
10.0    19826.90
11.0    20264.39
12.0    20632.23
13.0    20792.38
14.0    21064.02
15.0    21375.30
16.0    20066.29
17.0    19666.74
18.0    20020.24
19.0    19679.55
20.0    19881.39
21.0    20060.82
22.0    19897.75
23.0    20503.63
24.0    20571.77
25.0    20088.39
26.0    20823.99
27.0    20249.93
28.0    20038.48
29.0    20758.97
30.0    20662.34
31.0    20285.02
32.0    19052.69
33.0    18745.05
34.0    18890.03
35.0    20627.34
36.0    19918.28
37.0    20077.21
38.0    20648.27
39.0    21323.44
40.0    21132.91
41.0    21656.94
42.0    21958.42
43.0    21998.57
44.0     7465.21
Name: Population Staying at Home, dtype: float64
Total Time: 39.29 seconds
Read Time: 39.21 seconds
Groupby Time: 0.08 seconds
Results with 10 processors:

Results with 10 processors:
Week
0.0     20976.79
1.0     20071.31
2.0     19897.34
3.0     19889.44
4.0     20096.31
5.0     19757.43
6.0     19599.07
7.0     20033.60
8.0     19741.67
9.0     20043.10
10.0    19826.90
11.0    20264.39
12.0    20632.23
13.0    20792.38
14.0    21064.02
15.0    21375.30
16.0    20066.29
17.0    19666.74
18.0    20020.24
19.0    19679.55
20.0    19881.39
21.0    20060.82
22.0    19897.75
23.0    20503.63
24.0    20571.77
25.0    20088.39
26.0    20823.99
27.0    20249.93
28.0    20038.48
29.0    20758.97
30.0    20662.34
31.0    20285.02
32.0    19052.69
33.0    18745.05
34.0    18890.03
35.0    20627.34
36.0    19918.28
37.0    20077.21
38.0    20648.27
39.0    21323.44
40.0    21132.91
41.0    21656.94
42.0    21958.42
43.0    21998.57
44.0     7465.21
Name: Population Staying at Home, dtype: float64
Total Time: 52.900000000000006 seconds
Read Time: 52.84 seconds
Groupby Time: 0.06 seconds

Results with 20 processors:
Week
0.0     20976.79
1.0     20071.31
2.0     19897.34
3.0     19889.44
4.0     20096.31
5.0     19757.43
6.0     19599.07
7.0     20033.60
8.0     19741.67
9.0     20043.10
10.0    19826.90
11.0    20264.39
12.0    20632.23
13.0    20792.38
14.0    21064.02
15.0    21375.30
16.0    20066.29
17.0    19666.74
18.0    20020.24
19.0    19679.55
20.0    19881.39
21.0    20060.82
22.0    19897.75
23.0    20503.63
24.0    20571.77
25.0    20088.39
26.0    20823.99
27.0    20249.93
28.0    20038.48
29.0    20758.97
30.0    20662.34
31.0    20285.02
32.0    19052.69
33.0    18745.05
34.0    18890.03
35.0    20627.34
36.0    19918.28
37.0    20077.21
38.0    20648.27
39.0    21323.44
40.0    21132.91
41.0    21656.94
42.0    21958.42
43.0    21998.57
44.0     7465.21
Name: Population Staying at Home, dtype: float64
Total Time: 51.82 seconds
Read Time: 51.77 seconds
Groupby Time: 0.05 seconds
In [11]:
fig = px.line(pd.DataFrame(results).reset_index(), x='Week', y='Population Staying at Home', title='Average Number of People by Week')
#layout for better visualization
fig.update_layout(xaxis_title='Week', yaxis_title='Average Number of People Staying at Home by Week')
# figure
fig.update_layout(height=600, width=1200)
#line color
fig.update_traces(line=dict(color='steelblue'))
fig.show()

1B - Data Aggregation (How far people travel)¶

In [12]:
# Function to read the data
def read_data(file_path):
    start = time()
    air = dd.read_csv(file_path, assume_missing=True , dtype={'County Name': 'object', 'State Postal Code': 'object'})
    #process the data
    # Drop columns with more than 50% missing entries
    missing = air.isnull().sum().compute()
    threshold = 0.5
    columns_to_drop = missing[missing > threshold * len(air)].index
    
    air = air.drop(columns=columns_to_drop)
    air = air.dropna().reset_index()
    # Winsorize outliers for numeric columns
    numeric_columns = air.select_dtypes(include='number').columns


    # Drop rows with missing values after winsorization
    air = air.dropna().reset_index(drop=True)
    
    read_time = round(time() - start, 2)
    return air, read_time

def group_week_dask(data, num_workers):
    dask.config.set(scheduler='processes', num_workers=num_workers)
    start = time()
    # Extracting week numbers from 'Week Ending Date' column
    data = data.assign(Week = lambda x: data['Week of Date'].str.extract('(\d+)'))
    # Group by 'Week' and calculate the mean for selected columns
    averages = data.groupby("Week")[['Trips 1-25 Miles',
                                      'Trips 1-3 Miles', 'Trips 10-25 Miles', 'Trips 100-250 Miles',
                                      'Trips 100+ Miles', 'Trips 25-100 Miles', 'Trips 25-50 Miles',
                                      'Trips 250-500 Miles', 'Trips 3-5 Miles', 'Trips 5-10 Miles',
                                      'Trips 50-100 Miles', 'Trips 500+ Miles']].mean()
    result = averages.compute()
    groupby_time = round(time() - start, 2)
    return result, groupby_time




def group_week_dask_sequential(data):
    dask.config.set(scheduler='synchronous')
    start = time()
    # Extracting week numbers from 'Week Ending Date' column
    data = data.assign(Week = lambda x: data['Week of Date'].str.extract('(\d+)'))
    # Group by 'Week' and calculate the mean for selected columns
    averages = data.groupby("Week")[['Trips 1-25 Miles',
                                      'Trips 1-3 Miles', 'Trips 10-25 Miles', 'Trips 100-250 Miles',
                                      'Trips 100+ Miles', 'Trips 25-100 Miles', 'Trips 25-50 Miles',
                                      'Trips 250-500 Miles', 'Trips 3-5 Miles', 'Trips 5-10 Miles',
                                      'Trips 50-100 Miles', 'Trips 500+ Miles']].mean()
    result = averages.compute()
    groupby_time = round(time() - start, 2)
    return result, groupby_time

def compare_performance(file_path):
    # Read data sequentially
    air_seq, read_time_seq = read_data(file_path)
    result_seq, groupby_time_seq = group_week_dask_sequential(air_seq)
    total_time_seq_1b = read_time_seq + groupby_time_seq

    print("Results with sequential processing:")
    display(round(result_seq))
    print("Total Time: {} seconds".format(total_time_seq_1b))
    print("Read Time: {} seconds".format(read_time_seq))
    print("Groupby Time: {} seconds".format(groupby_time_seq))

    # Read data using Dask with 10 processors
    air_dask_10, read_time_dask_10 = read_data(file_path)
    result_dask_10, groupby_time_dask_10 = group_week_dask(air_dask_10, num_workers=10)
    total_time_dask_10 = read_time_dask_10 + groupby_time_dask_10

    print("\nResults with Dask processing (10 processors):")
    display(round(result_dask_10))
    print("Total Time: {} seconds".format(total_time_dask_10))
    print("Read Time: {} seconds".format(read_time_dask_10))
    print("Groupby Time: {} seconds".format(groupby_time_dask_10))

    # Read data using Dask with 20 processors
    air_dask_20, read_time_dask_20 = read_data(file_path)
    result_dask_20, groupby_time_dask_20 = group_week_dask(air_dask_20, num_workers=20)
    total_time_dask_20 = read_time_dask_20 + groupby_time_dask_20

    print("\nResults with Dask processing (20 processors):")
    display(round(result_dask_20))
    print("Total Time: {} seconds".format(total_time_dask_20))
    print("Read Time: {} seconds".format(read_time_dask_20))
    print("Groupby Time: {} seconds".format(groupby_time_dask_20))
    #return dataframe for visualization
    return result_dask_20, total_time_dask_10, total_time_dask_20, total_time_seq_1b

if __name__ == '__main__':
    file_path = "data/Trips_Full Data.csv"
    results_1b, total_time_dask_10, total_time_dask_20, total_time_seq_1b = compare_performance(file_path)
Results with sequential processing:
Trips 1-25 Miles Trips 1-3 Miles Trips 10-25 Miles Trips 100-250 Miles Trips 100+ Miles Trips 25-100 Miles Trips 25-50 Miles Trips 250-500 Miles Trips 3-5 Miles Trips 5-10 Miles Trips 50-100 Miles Trips 500+ Miles
Week
32 1.015555e+09 369476657.0 231078511.0 6850130.0 12122473.0 88037455.0 69159131.0 1829242.0 181555834.0 233444464.0 18878323.0 3443101.0
Total Time: 11.28 seconds
Read Time: 11.04 seconds
Groupby Time: 0.24 seconds

Results with Dask processing (10 processors):
Trips 1-25 Miles Trips 1-3 Miles Trips 10-25 Miles Trips 100-250 Miles Trips 100+ Miles Trips 25-100 Miles Trips 25-50 Miles Trips 250-500 Miles Trips 3-5 Miles Trips 5-10 Miles Trips 50-100 Miles Trips 500+ Miles
Week
32 1.015555e+09 369476657.0 231078511.0 6850130.0 12122473.0 88037455.0 69159131.0 1829242.0 181555834.0 233444464.0 18878323.0 3443101.0
Total Time: 5.4 seconds
Read Time: 0.37 seconds
Groupby Time: 5.03 seconds

Results with Dask processing (20 processors):
Trips 1-25 Miles Trips 1-3 Miles Trips 10-25 Miles Trips 100-250 Miles Trips 100+ Miles Trips 25-100 Miles Trips 25-50 Miles Trips 250-500 Miles Trips 3-5 Miles Trips 5-10 Miles Trips 50-100 Miles Trips 500+ Miles
Week
32 1.015555e+09 369476657.0 231078511.0 6850130.0 12122473.0 88037455.0 69159131.0 1829242.0 181555834.0 233444464.0 18878323.0 3443101.0
Total Time: 16.240000000000002 seconds
Read Time: 10.66 seconds
Groupby Time: 5.58 seconds
In [14]:
melted_data = results_1b.reset_index().melt(id_vars='Week', var_name='Trip', value_name='Average Trips')

# Plot the transposed data
fig = px.bar(melted_data, x='Trip', y='Average Trips', color='Trip',
             title='Average Trips per Week for Different Trips',
             labels={'Week': 'Week', 'Average Trips': 'Average Trips'})
fig.update_layout(height=600, width=1200)
fig.show()

1 C - Filtering¶

In [15]:
import dask
import dask.dataframe as dd
from time import time

def read_data(file_path):
    #import data using Dask
    start_read_time = time()
    dtypes = {'County Name': 'object', 'State Postal Code': 'object', 'Number of Trips': 'float64',
       'Number of Trips 1-3': 'float64',
       'Number of Trips 10-25': 'float64',
       'Number of Trips 100-250': 'float64',
       'Number of Trips 25-50': 'float64',
       'Number of Trips 250-500': 'float64',
       'Number of Trips 3-5': 'float64',
       'Number of Trips 5-10': 'float64',
       'Number of Trips 50-100': 'float64',
       'Number of Trips <1': 'float64',
       'Number of Trips >=500': 'float64',
       'Population Not Staying at Home': 'float64',
       'Population Staying at Home': 'float64'}
    data = dd.read_csv(file_path, dtype=dtypes)
    end_read_time = time()
    read_time = round(end_read_time - start_read_time, 2)

    return data, read_time

def filter_data(data, num_workers=None):
    # Select analysis columns
    selected_columns = ['Date', 'Number of Trips 10-25', 'Number of Trips 50-100']
    data_selected = data[selected_columns]

    # Drop rows with missing entries
    data_selected = data_selected.dropna()

    # Record start time for filtering
    start_filter_time = time()

    # Filter data for > 100,000,000 people and 10-25 number of trips
    filtered_10_25 = data_selected[data_selected['Number of Trips 10-25'] > 100000000]

    # Filter data for > 10,000,000 people and 50-100 number of trips
    filtered_50_100 = data_selected[data_selected['Number of Trips 50-100'] > 10000000]

    # Compute the results using Dask delayed
    if num_workers is not None:
        dask.config.set(scheduler='synchronous')

        # Lazily compute the results using Dask delayed
        filtered_10_25 = dask.delayed(filtered_10_25.compute)()
        filtered_50_100 = dask.delayed(filtered_50_100.compute)()

    # Record end time for filtering
    end_filter_time = time()
    filter_time = round(end_filter_time - start_filter_time, 2)

    return filtered_10_25, filtered_50_100, filter_time

def main():
    file_path = "data/Trips_by_Distance.csv"

    # Processing sequentially
    data_seq, read_time_seq = read_data(file_path)
    filtered_10_25_seq, filtered_50_100_seq, filter_time_seq = filter_data(data_seq.compute(scheduler='synchronous'))
    total_time_seq = read_time_seq + filter_time_seq

    print("\nResults with sequential processing (synchronous scheduler):")
    print("\nRead Time (Sequential): {} seconds".format(read_time_seq))
    print("Filtering Time (Sequential): {} seconds".format(filter_time_seq))
    print("Total Time (Sequential): {} seconds".format(total_time_seq))

    # Processing with 10 processors
    data_10, read_time_10 = read_data(file_path)
    filtered_10_25_10, filtered_50_100_10, filter_time_10 = filter_data(data_10, num_workers=10)
    total_time_10 = read_time_10 + filter_time_10

    print("\nResults with 10 processors (synchronous scheduler):")
    print("\nRead Time (10 processors): {} seconds".format(read_time_10))
    print("Filtering Time (10 processors): {} seconds".format(filter_time_10))
    print("Total Time (10 processors): {} seconds".format(total_time_10))

    # Processing with 20 processors
    data_20, read_time_20 = read_data(file_path)
    filtered_10_25_20, filtered_50_100_20, filter_time_20 = filter_data(data_20, num_workers=20)
    total_time_20 = read_time_20 + filter_time_20

    print("\nResults with 20 processors (synchronous scheduler):")
    print("\nRead Time (20 processors): {} seconds".format(read_time_20))
    print("Filtering Time (20 processors): {} seconds".format(filter_time_20))
    print("Total Time (20 processors): {} seconds".format(total_time_20))

    # Return DataFrames for visualization
    return (
        filtered_10_25_seq, filtered_50_100_seq, total_time_seq,
        filtered_10_25_10, filtered_50_100_10, total_time_10,
        filtered_10_25_20, filtered_50_100_20, total_time_20
    )

if __name__ == '__main__':
    (
        filtered_10_25_seq, filtered_50_100_seq, total_time_seq,
        filtered_10_25_10, filtered_50_100_10, total_time_10,
        filtered_10_25_20, filtered_50_100_20, total_time_20
    ) = main()
Results with sequential processing (synchronous scheduler):

Read Time (Sequential): 0.05 seconds
Filtering Time (Sequential): 0.01 seconds
Total Time (Sequential): 0.060000000000000005 seconds

Results with 10 processors (synchronous scheduler):

Read Time (10 processors): 0.07 seconds
Filtering Time (10 processors): 0.01 seconds
Total Time (10 processors): 0.08 seconds

Results with 20 processors (synchronous scheduler):

Read Time (20 processors): 0.06 seconds
Filtering Time (20 processors): 0.01 seconds
Total Time (20 processors): 0.06999999999999999 seconds
In [16]:
#plot number of participants
compared = filtered_10_25_20.merge(filtered_50_100_20).compute()
In [17]:
fig = px.line(compared, x='Date', y=['Number of Trips 10-25', 'Number of Trips 50-100'],
              title='Filtered Trips with > 100,000,000 people and 10-25 or 50-100 Number of Trips')

# Update layout for better visualization
fig.update_layout(xaxis_title='Date', yaxis_title='Number of Trips', legend_title='Trip Categories')
fig.update_layout(legend=dict(
    orientation="h",
    yanchor="middle",
    y=1.02,
    xanchor="right",
    x=1
))
fig.update_layout(height=600, width=1200)
fig.show()
In [ ]:
 
Compare Time used for Sequential, 10 Processors, and 20 Processors¶
In [19]:
processors = ["10 Processors Q1.A", '20 Processors Q1.A',"Sequential Q1.A","10 Processors Q1.B",
              '20 Processors Q1.B',"Sequential Q1.B", "Sequential Q1.C","10 Processors Q1.C", '20 Processors Q1.C']
processing_time = [total_time_dask_10_1a, total_time_dask_20_1a, total_time_seq_1a,
                   total_time_dask_10, total_time_dask_20, total_time_seq_1b, total_time_seq, total_time_10, total_time_20]
In [42]:
# Define the processors and processing times
processors = ["10 Processors Q1.A", '20 Processors Q1.A', "Sequential Q1.A", "10 Processors Q1.B",
              '20 Processors Q1.B', "Sequential Q1.B", "Sequential Q1.C", "10 Processors Q1.C", '20 Processors Q1.C']
processors_level = ["10 Processors", '20 Processors', "Sequential", "10 Processors",
              '20 Processors', "Sequential", "Sequential", "10 Processors", '20 Processors']
processing_time = [total_time_dask_10_1a, total_time_dask_20_1a, total_time_seq_1a,
                   total_time_dask_10, total_time_dask_20, total_time_seq_1b, total_time_seq, total_time_10, total_time_20]

# Create a DataFrame
df = pd.DataFrame({'Processor': processors,'Level':processors_level, 'Processing Time (seconds)': processing_time})

# show processing times
df
Out[42]:
Processor Level Processing Time (seconds)
0 10 Processors Q1.A 10 Processors 52.90
1 20 Processors Q1.A 20 Processors 51.82
2 Sequential Q1.A Sequential 39.29
3 10 Processors Q1.B 10 Processors 5.40
4 20 Processors Q1.B 20 Processors 16.24
5 Sequential Q1.B Sequential 11.28
6 Sequential Q1.C Sequential 0.06
7 10 Processors Q1.C 10 Processors 0.08
8 20 Processors Q1.C 20 Processors 0.07
In [43]:
#df.to_csv("processing times.csv", index = False)
In [22]:
# Compare mean processing times
mean_times = pd.DataFrame(df.groupby('Level')['Processing Time (seconds)'].mean())
print("\nMean Processing Times:")
mean_times
Mean Processing Times:
Out[22]:
Processing Time (seconds)
Level
10 Processors 19.460000
20 Processors 22.710000
Sequential 16.876667
In [23]:
#bar chart for mean processing times
fig = px.bar(mean_times.reset_index(), x='Level', y='Processing Time (seconds)', 
             title='Mean Processing Times by Processing type',
             labels={'Processing Time (seconds)': 'Mean Processing Time (seconds)'})

# Update layout for better visualization
fig.update_layout(xaxis_title='Level', yaxis_title='Mean Processing Time (seconds)')

# Display the figure
fig.show()

1E - Modeling¶

In [24]:
data_distance = dd.read_csv('data/Trips_by_Distance.csv', assume_missing=True, dtype={'County Name': 'object', 'State Postal Code': 'object'})
#subset data for use
selected_columns = [
        'Population Not Staying at Home',
        'Number of Trips <1', 'Number of Trips 1-3', 'Number of Trips 3-5',
        'Number of Trips 5-10', 'Number of Trips 10-25',
        'Number of Trips 25-50', 'Number of Trips 50-100',
        'Number of Trips 100-250', 'Number of Trips 250-500',
        'Number of Trips >=500', 'Week'
    ]

data_selected = data_distance[selected_columns]
In [25]:
###test for linearity
# Compute Pearson correlation coefficients
correlation_y_x = data_selected.corr(method='pearson').compute()
In [26]:
#heatmap for the correlation matrix
plt.figure(figsize=(10, 8))
sns.heatmap(correlation_y_x, annot=True, cmap='coolwarm', fmt=".2f", linewidths=0.5)
plt.title('Pearson Correlation Coefficients - Linearity Test')
plt.show()
No description has been provided for this image

Since all the variables 'Number of Trips <1', 'Number of Trips 1-3', 'Number of Trips 3-5', 'Number of Trips 5-10', 'Number of Trips 10-25', 'Number of Trips 25-50', 'Number of Trips 50-100', 'Number of Trips 100-250', 'Number of Trips 250-500', 'Number of Trips >=500' are highly correlated with each other and with Population Not Staying at Home ( target variable), this will present the problem of multicollinearity if all variables are used in the model. Therefore only the variable "Number of Trips 50-100" which has the highes correlation is used along side week number as predictors.

In [27]:
%%time
#get the variables to use
vars = ['Population Not Staying at Home', 'Number of Trips 50-100', 'Week']
#subset
datax = data_selected[vars]
#drop nan
datax = datax.dropna().compute()
#get x
X = datax[['Number of Trips 50-100', 'Week']]
#get y 
y = datax['Population Not Staying at Home']
print("X Shape:", X.shape)
print("\ny Shape:", y.shape)
#obtain x and y values
dask_x = X.values
dask_y = y.values
X Shape: (1035625, 2)

y Shape: (1035625,)
CPU times: total: 6.62 s
Wall time: 11.4 s
In [28]:
# Split the data into training and testing sets (70:30 ratio)
X_train, X_test, y_train, y_test = train_test_split(dask_x, dask_y, test_size=0.3, random_state=42, shuffle = True)
#fit model
lr = LinearRegression(fit_intercept=True)
lr.fit(X_train, y_train)
#make predictions on test data
y_pred = lr.predict(X_test)
#evaluate the model
# Evaluate R-squared and RMSE
r_squared = round(r2_score(y_test, y_pred), 4)
rmse = round(mean_squared_error(y_test, y_pred, squared=False), 4)
print("R-Squared:", r_squared)
print("RMSE:", rmse)
R-Squared: 0.9783
RMSE: 1139104.8422
Increase Training Size¶
In [29]:
# Split the data into training and testing sets (70:30 ratio)
X_train, X_test, y_train, y_test = train_test_split(dask_x, dask_y, test_size=0.1, random_state=42, shuffle = True)
#fit model
lr = LinearRegression(fit_intercept=True)
lr.fit(X_train, y_train)
#make predictions on test data
y_pred = lr.predict(X_test)
#evaluate the model
# Evaluate R-squared and RMSE
r_squared1 = round(r2_score(y_test, y_pred), 4)
rmse1 = round(mean_squared_error(y_test, y_pred, squared=False), 4)
print("R-Squared:", r_squared1)
print("RMSE:", rmse1)
R-Squared: 0.9754
RMSE: 1201830.882
In [ ]:
 
In [30]:
# Split the data into training and testing sets (70:30 ratio)
X_train, X_test, y_train, y_test = train_test_split(dask_x, dask_y, test_size=0.2, random_state=42, shuffle = True)
#fit model
lr = LinearRegression(fit_intercept=True)
lr.fit(X_train, y_train)
#make predictions on test data
y_pred = lr.predict(X_test)
#evaluate the model
# Evaluate R-squared and RMSE
r_squared2 = round(r2_score(y_test, y_pred), 4)
rmse2 = round(mean_squared_error(y_test, y_pred, squared=False), 4)
print("R-Squared:", r_squared2)
print("RMSE:", rmse2)
R-Squared: 0.9759
RMSE: 1192543.3382
Reduce the training size¶
In [31]:
# Split the data into training and testing sets (70:30 ratio)
X_train, X_test, y_train, y_test = train_test_split(dask_x, dask_y, test_size=0.4, random_state=42, shuffle = True)
#fit model
lr = LinearRegression(fit_intercept=True)
lr.fit(X_train, y_train)
#make predictions on test data
y_pred = lr.predict(X_test)
#evaluate the model
# Evaluate R-squared and RMSE
r_squared3 = round(r2_score(y_test, y_pred), 4)
rmse3 = round(mean_squared_error(y_test, y_pred, squared=False), 4)
print("R-Squared:", r_squared3)
print("RMSE:", rmse3)
R-Squared: 0.9783
RMSE: 1114474.4401
Plot actual vs predicted¶
In [32]:
# Convert Dask DataFrames to Pandas DataFrames for plotting
y_test_pd = y_test
y_pred_pd = y_pred

# Plot actual vs. predicted values
plt.scatter(y_test_pd, y_pred_pd, alpha=0.5)
plt.title('Actual vs. Predicted Values')
plt.xlabel('Actual Values')
plt.ylabel('Predicted Values')
plt.grid()
plt.show()
No description has been provided for this image
Compare performance by data size¶
In [33]:
#data size
data_size = ['60%', '70%', '80%','90%']
r_squareds = [r_squared, r_squared1, r_squared2, r_squared3]
#rmses
rmses = [rmse, rmse1, rmse2, rmse3]
#dataframe
df = pd.DataFrame({
    'Data Size': data_size,
    'R-Squared': r_squareds,
    'RMSE': [round(item, 2) for item in rmses]
})

# Sort DataFrame by RMSE
df.sort_values(by='RMSE')
Out[33]:
Data Size R-Squared RMSE
3 90% 0.9783 1114474.44
0 60% 0.9783 1139104.84
2 80% 0.9759 1192543.34
1 70% 0.9754 1201830.88
In [34]:
#df.sort_values(by='RMSE').to_csv("performance.csv", index = False)

1F - Visualization¶

In [35]:
#read and process data
full_data, time_taken = read_data("data/Trips_Full Data.csv")
#confirm data is processed with no missing values
missing_full = full_data.isnull().sum().compute()
missing_full
Out[35]:
Month of Date                 0
Week of Date                  0
Year of Date                  0
Level                         0
Date                          0
Week Ending Date              0
Trips <1 Mile                 0
People Not Staying at Home    0
Population Staying at Home    0
Trips                         0
Trips 1-25 Miles              0
Trips 1-3 Miles               0
Trips 10-25 Miles             0
Trips 100-250 Miles           0
Trips 100+ Miles              0
Trips 25-100 Miles            0
Trips 25-50 Miles             0
Trips 250-500 Miles           0
Trips 3-5 Miles               0
Trips 5-10 Miles              0
Trips 50-100 Miles            0
Trips 500+ Miles              0
dtype: int64
Visualize to check for distribution¶
In [36]:
boxplot_variables = ['People Not Staying at Home', 'Population Staying at Home',
                       ]

# Select the desired variables for the box plots
boxplot_data = full_data.compute()[boxplot_variables]

# Melt the DataFrame to create long-form data suitable for box plots
melted_data = pd.melt(boxplot_data, var_name='Variable', value_name='Value')

# Create box plots using Plotly Express
fig = px.box(melted_data, x='Variable', y='Value', points="all", title='Box Plots for Trip-related Variables')

# Update layout for better visualization
fig.update_layout(xaxis_title='Variable', yaxis_title='Value')

# Show the figure
fig.show()
In [37]:
fig = px.line(full_data.compute(), x='Date', y='Trips <1 Mile', title='Number of participants making trips with < Mile Over Time')

# Update layout for better visualization
fig.update_layout(xaxis_title='Date', yaxis_title='Trips')

# Display the figure
fig.show()
In [38]:
boxplot_variables = ['Trips <1 Mile', 'Trips 1-25 Miles', 'Trips 1-3 Miles','Trips 10-25 Miles','Trips 3-5 Miles',
                      'Trips 5-10 Miles','Trips 25-50 Miles'
                       ]

# Select the desired variables for the box plots
boxplot_data = full_data.compute()[boxplot_variables]

# Melt the DataFrame to create long-form data suitable for box plots
melted_data = pd.melt(boxplot_data, var_name='Variable', value_name='Value')

# Create box plots using Plotly Express
fig = px.box(melted_data, x='Variable', y='Value', points="all", title='Box Plots for Short Trips')

# Update layout for better visualization
fig.update_layout(xaxis_title='Variable', yaxis_title='Value')

# Show the figure
fig.show()
In [40]:
boxplot_variables = ['Trips 100-250 Miles', 'Trips 100+ Miles', 'Trips 25-100 Miles',
                       'Trips 250-500 Miles',  'Trips 50-100 Miles', 'Trips 500+ Miles'
                       ]

# Select the desired variables for the box plots
boxplot_data = full_data.compute()[boxplot_variables]

# Melt the DataFrame to create long-form data suitable for box plots
melted_data = pd.melt(boxplot_data, var_name='Variable', value_name='Value')

# Create box plots using Plotly Express
fig = px.box(melted_data, x='Variable', y='Value', points="all", title='Box Plots for Long Trips')

# Update layout for better visualization
fig.update_layout(xaxis_title='Variable', yaxis_title='Value')

# Show the figure
fig.show()